aws_stream_filter_framerate
1import datetime 2import json 3import os 4import shutil 5 6import boto3 7import numpy as np 8from dotenv import load_dotenv 9from tqdm import tqdm 10 11load_dotenv() 12 13S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "mcity-data-engine") 14aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") 15aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") 16region_name = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") 17 18session = boto3.Session( 19 aws_access_key_id=aws_access_key_id, 20 aws_secret_access_key=aws_secret_access_key, 21 region_name=region_name, 22) 23s3 = session.client("s3") 24 25 26class SampleTimestamps: 27 28 def __init__( 29 self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None 30 ): 31 self.file_path = file_path 32 self.aws_bucket = aws_bucket 33 self.aws_prefix = aws_prefix 34 self.target_framerate_hz = target_framerate_hz 35 self.valid_target_framerate = True 36 self.current_framerate_hz = None 37 self.timestamps = [] 38 39 if file_path: 40 self.execution_mode = "local" 41 elif aws_bucket and aws_prefix: 42 self.execution_mode = "aws" 43 else: 44 raise ValueError( 45 "Either file_path or aws_bucket and aws_prefix must be provided" 46 ) 47 48 def get_timestamps(self): 49 timestamps = [] 50 51 with open(self.file_path, "r") as file: 52 total_lines = sum(1 for _ in file) 53 54 with open(self.file_path, "r") as file: 55 # Collecting data 56 for index, line in enumerate(file): 57 data = json.loads(line) 58 if "time" in data and "data" in data: 59 # Get time data 60 timestamp_raw = data.get("time") 61 try: 62 # Most timestamps have milliseconds 63 timestamp = datetime.datetime.strptime( 64 timestamp_raw, "%Y-%m-%d %H:%M:%S.%f" 65 ) 66 except: 67 # Timestamps without milliseconds 68 timestamp = datetime.datetime.strptime( 69 timestamp_raw, "%Y-%m-%d %H:%M:%S" 70 ) 71 72 elif ( 73 "image" in data 74 and "sensor_name" in data 75 and "event_timestamp" in data 76 ): 77 # Get time data 78 timestamp_raw = data.get("event_timestamp") 79 timestamp = datetime.datetime.fromtimestamp( 80 timestamp_raw, tz=datetime.timezone.utc 81 ) 82 83 timestamps.append((index, timestamp)) 84 85 return timestamps 86 87 def get_framerate(self, timestamps, log): 88 89 # Calculate time differences (s) and current framerate (Hz) 90 time_differences = [] 91 timestamps = sorted(timestamps, key=lambda x: x[1]) 92 93 previous_time = None 94 for index, timestamp in timestamps: 95 if previous_time is not None: 96 time_difference = (timestamp - previous_time).total_seconds() 97 time_differences.append(time_difference) 98 previous_time = timestamp 99 100 # Statistics about timestamp distribution 101 average_time_diff = np.mean(time_differences) 102 median_time_diff = np.median(time_differences) 103 std_time_diff = np.std(time_differences) 104 min_time_diff = np.min(time_differences) 105 max_time_diff = np.max(time_differences) 106 range_time_diff = max_time_diff - min_time_diff 107 q1_time_diff = np.percentile(time_differences, 25) 108 q3_time_diff = np.percentile(time_differences, 75) 109 current_framerate_hz = 1 / median_time_diff # Median is more robust to outliers 110 111 log["time_s_avg_between_timestamps"] = average_time_diff 112 log["time_s_median_between_timestamps"] = median_time_diff 113 log["time_s_std_between_timestamps"] = std_time_diff 114 log["time_s_min_between_timestamps"] = min_time_diff 115 log["time_s_max_between_timestamps"] = max_time_diff 116 log["time_s_range_between_timestamps"] = range_time_diff 117 log["time_s_25_percentile_between_timestamps"] = q1_time_diff 118 log["time_s_75_percentile_between_timestamps"] = q3_time_diff 119 log["framerate_hz_original"] = current_framerate_hz 120 log["framerate_hz_target"] = self.target_framerate_hz 121 122 # Compute threshold 123 interquartile_range = q3_time_diff - q1_time_diff # Interquartile Range 124 upper_bound_threshold = ( 125 q3_time_diff + 1.5 * interquartile_range 126 ) # (1.5 * IQR rule) 127 log["upper_bound_threshold"] = upper_bound_threshold 128 129 return current_framerate_hz, timestamps, upper_bound_threshold 130 131 def check_target_framerate(self, current_framerate_hz, log): 132 # Check if target framerate is valid 133 if self.target_framerate_hz > current_framerate_hz: 134 print( 135 f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz" 136 ) 137 log["framerate_target_ok"] = False 138 return False 139 else: 140 log["framerate_target_ok"] = True 141 return True 142 143 def sample_timestamps(self, timestamps, threshold_to_target, log): 144 # Generate target timestamps 145 start_time = timestamps[0][1] 146 end_time = timestamps[-1][1] 147 target_timestamps = [] 148 current_time = start_time 149 while current_time <= end_time: 150 target_timestamps.append(current_time) 151 current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz) 152 153 # Find nearest original timestamps to target_timestamps_seconds 154 selected_indices = [] 155 selected_timestamps = [] 156 selected_target_timestamps = [] 157 158 for target in target_timestamps: 159 # Compute the time difference with each original timestamp 160 time_diffs = [(target - t).total_seconds() for i, t in timestamps] 161 time_diffs = np.abs(time_diffs) # Take absolute differences 162 163 # Find the index of the nearest timestamp 164 nearest_index = np.argmin(time_diffs) 165 166 # Ensure no duplicates are selected 167 if ( 168 time_diffs[nearest_index] <= threshold_to_target 169 and nearest_index not in selected_indices 170 ): 171 selected_target_timestamps.append(target) 172 selected_indices.append(nearest_index) 173 selected_timestamps.append(timestamps[nearest_index]) 174 175 # Log statistics 176 log["n_original_timestamps"] = len(timestamps) 177 log["n_target_timestamps"] = len(target_timestamps) 178 log["n_selected_timestamps"] = len(selected_timestamps) 179 180 if len(selected_timestamps) >=2: 181 # Compute new framerate 182 time_differences_new = [] 183 timestamps_new = sorted(selected_timestamps, key=lambda x: x[1]) 184 185 previous_time = None 186 for index, timestamp in timestamps_new: 187 if previous_time is not None: 188 time_difference = (timestamp - previous_time).total_seconds() 189 time_differences_new.append(time_difference) 190 previous_time = timestamp 191 median_time_diff_new = np.median(time_differences_new) 192 new_framerate_hz = 1 / median_time_diff_new 193 log["framerate_hz_sampled"] = new_framerate_hz 194 195 else: 196 print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}") 197 198 return ( 199 selected_indices, 200 selected_timestamps, 201 target_timestamps, 202 selected_target_timestamps, 203 ) 204 205 def update_upload_file(self, file_name, selected_indices, aws_source_path): 206 output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz" 207 lines = [] 208 with open(file_name, "r") as file: 209 for index, line in enumerate(file): 210 if index in selected_indices: 211 lines.append(line) 212 213 with open(output_file_path, "w") as output_file: 214 output_file.writelines(lines) 215 216 try: 217 head, tail = os.path.split(output_file.name) 218 file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024) 219 s3.upload_file( 220 output_file.name, 221 S3_BUCKET_NAME, 222 str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail, 223 ) 224 return file_size_mb 225 226 except Exception as e: 227 print( 228 "S3 upload failed for file " 229 + str(str(self.target_framerate_hz) + "/" + tail) 230 + " - " 231 + str(e) 232 ) 233 return None
S3_BUCKET_NAME =
'mcity-data-engine'
aws_access_key_id =
None
aws_secret_access_key =
None
region_name =
'us-east-1'
session =
Session(region_name='us-east-1')
s3 =
<botocore.client.S3 object>
class
SampleTimestamps:
27class SampleTimestamps: 28 29 def __init__( 30 self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None 31 ): 32 self.file_path = file_path 33 self.aws_bucket = aws_bucket 34 self.aws_prefix = aws_prefix 35 self.target_framerate_hz = target_framerate_hz 36 self.valid_target_framerate = True 37 self.current_framerate_hz = None 38 self.timestamps = [] 39 40 if file_path: 41 self.execution_mode = "local" 42 elif aws_bucket and aws_prefix: 43 self.execution_mode = "aws" 44 else: 45 raise ValueError( 46 "Either file_path or aws_bucket and aws_prefix must be provided" 47 ) 48 49 def get_timestamps(self): 50 timestamps = [] 51 52 with open(self.file_path, "r") as file: 53 total_lines = sum(1 for _ in file) 54 55 with open(self.file_path, "r") as file: 56 # Collecting data 57 for index, line in enumerate(file): 58 data = json.loads(line) 59 if "time" in data and "data" in data: 60 # Get time data 61 timestamp_raw = data.get("time") 62 try: 63 # Most timestamps have milliseconds 64 timestamp = datetime.datetime.strptime( 65 timestamp_raw, "%Y-%m-%d %H:%M:%S.%f" 66 ) 67 except: 68 # Timestamps without milliseconds 69 timestamp = datetime.datetime.strptime( 70 timestamp_raw, "%Y-%m-%d %H:%M:%S" 71 ) 72 73 elif ( 74 "image" in data 75 and "sensor_name" in data 76 and "event_timestamp" in data 77 ): 78 # Get time data 79 timestamp_raw = data.get("event_timestamp") 80 timestamp = datetime.datetime.fromtimestamp( 81 timestamp_raw, tz=datetime.timezone.utc 82 ) 83 84 timestamps.append((index, timestamp)) 85 86 return timestamps 87 88 def get_framerate(self, timestamps, log): 89 90 # Calculate time differences (s) and current framerate (Hz) 91 time_differences = [] 92 timestamps = sorted(timestamps, key=lambda x: x[1]) 93 94 previous_time = None 95 for index, timestamp in timestamps: 96 if previous_time is not None: 97 time_difference = (timestamp - previous_time).total_seconds() 98 time_differences.append(time_difference) 99 previous_time = timestamp 100 101 # Statistics about timestamp distribution 102 average_time_diff = np.mean(time_differences) 103 median_time_diff = np.median(time_differences) 104 std_time_diff = np.std(time_differences) 105 min_time_diff = np.min(time_differences) 106 max_time_diff = np.max(time_differences) 107 range_time_diff = max_time_diff - min_time_diff 108 q1_time_diff = np.percentile(time_differences, 25) 109 q3_time_diff = np.percentile(time_differences, 75) 110 current_framerate_hz = 1 / median_time_diff # Median is more robust to outliers 111 112 log["time_s_avg_between_timestamps"] = average_time_diff 113 log["time_s_median_between_timestamps"] = median_time_diff 114 log["time_s_std_between_timestamps"] = std_time_diff 115 log["time_s_min_between_timestamps"] = min_time_diff 116 log["time_s_max_between_timestamps"] = max_time_diff 117 log["time_s_range_between_timestamps"] = range_time_diff 118 log["time_s_25_percentile_between_timestamps"] = q1_time_diff 119 log["time_s_75_percentile_between_timestamps"] = q3_time_diff 120 log["framerate_hz_original"] = current_framerate_hz 121 log["framerate_hz_target"] = self.target_framerate_hz 122 123 # Compute threshold 124 interquartile_range = q3_time_diff - q1_time_diff # Interquartile Range 125 upper_bound_threshold = ( 126 q3_time_diff + 1.5 * interquartile_range 127 ) # (1.5 * IQR rule) 128 log["upper_bound_threshold"] = upper_bound_threshold 129 130 return current_framerate_hz, timestamps, upper_bound_threshold 131 132 def check_target_framerate(self, current_framerate_hz, log): 133 # Check if target framerate is valid 134 if self.target_framerate_hz > current_framerate_hz: 135 print( 136 f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz" 137 ) 138 log["framerate_target_ok"] = False 139 return False 140 else: 141 log["framerate_target_ok"] = True 142 return True 143 144 def sample_timestamps(self, timestamps, threshold_to_target, log): 145 # Generate target timestamps 146 start_time = timestamps[0][1] 147 end_time = timestamps[-1][1] 148 target_timestamps = [] 149 current_time = start_time 150 while current_time <= end_time: 151 target_timestamps.append(current_time) 152 current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz) 153 154 # Find nearest original timestamps to target_timestamps_seconds 155 selected_indices = [] 156 selected_timestamps = [] 157 selected_target_timestamps = [] 158 159 for target in target_timestamps: 160 # Compute the time difference with each original timestamp 161 time_diffs = [(target - t).total_seconds() for i, t in timestamps] 162 time_diffs = np.abs(time_diffs) # Take absolute differences 163 164 # Find the index of the nearest timestamp 165 nearest_index = np.argmin(time_diffs) 166 167 # Ensure no duplicates are selected 168 if ( 169 time_diffs[nearest_index] <= threshold_to_target 170 and nearest_index not in selected_indices 171 ): 172 selected_target_timestamps.append(target) 173 selected_indices.append(nearest_index) 174 selected_timestamps.append(timestamps[nearest_index]) 175 176 # Log statistics 177 log["n_original_timestamps"] = len(timestamps) 178 log["n_target_timestamps"] = len(target_timestamps) 179 log["n_selected_timestamps"] = len(selected_timestamps) 180 181 if len(selected_timestamps) >=2: 182 # Compute new framerate 183 time_differences_new = [] 184 timestamps_new = sorted(selected_timestamps, key=lambda x: x[1]) 185 186 previous_time = None 187 for index, timestamp in timestamps_new: 188 if previous_time is not None: 189 time_difference = (timestamp - previous_time).total_seconds() 190 time_differences_new.append(time_difference) 191 previous_time = timestamp 192 median_time_diff_new = np.median(time_differences_new) 193 new_framerate_hz = 1 / median_time_diff_new 194 log["framerate_hz_sampled"] = new_framerate_hz 195 196 else: 197 print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}") 198 199 return ( 200 selected_indices, 201 selected_timestamps, 202 target_timestamps, 203 selected_target_timestamps, 204 ) 205 206 def update_upload_file(self, file_name, selected_indices, aws_source_path): 207 output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz" 208 lines = [] 209 with open(file_name, "r") as file: 210 for index, line in enumerate(file): 211 if index in selected_indices: 212 lines.append(line) 213 214 with open(output_file_path, "w") as output_file: 215 output_file.writelines(lines) 216 217 try: 218 head, tail = os.path.split(output_file.name) 219 file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024) 220 s3.upload_file( 221 output_file.name, 222 S3_BUCKET_NAME, 223 str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail, 224 ) 225 return file_size_mb 226 227 except Exception as e: 228 print( 229 "S3 upload failed for file " 230 + str(str(self.target_framerate_hz) + "/" + tail) 231 + " - " 232 + str(e) 233 ) 234 return None
SampleTimestamps( file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None)
29 def __init__( 30 self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None 31 ): 32 self.file_path = file_path 33 self.aws_bucket = aws_bucket 34 self.aws_prefix = aws_prefix 35 self.target_framerate_hz = target_framerate_hz 36 self.valid_target_framerate = True 37 self.current_framerate_hz = None 38 self.timestamps = [] 39 40 if file_path: 41 self.execution_mode = "local" 42 elif aws_bucket and aws_prefix: 43 self.execution_mode = "aws" 44 else: 45 raise ValueError( 46 "Either file_path or aws_bucket and aws_prefix must be provided" 47 )
def
get_timestamps(self):
49 def get_timestamps(self): 50 timestamps = [] 51 52 with open(self.file_path, "r") as file: 53 total_lines = sum(1 for _ in file) 54 55 with open(self.file_path, "r") as file: 56 # Collecting data 57 for index, line in enumerate(file): 58 data = json.loads(line) 59 if "time" in data and "data" in data: 60 # Get time data 61 timestamp_raw = data.get("time") 62 try: 63 # Most timestamps have milliseconds 64 timestamp = datetime.datetime.strptime( 65 timestamp_raw, "%Y-%m-%d %H:%M:%S.%f" 66 ) 67 except: 68 # Timestamps without milliseconds 69 timestamp = datetime.datetime.strptime( 70 timestamp_raw, "%Y-%m-%d %H:%M:%S" 71 ) 72 73 elif ( 74 "image" in data 75 and "sensor_name" in data 76 and "event_timestamp" in data 77 ): 78 # Get time data 79 timestamp_raw = data.get("event_timestamp") 80 timestamp = datetime.datetime.fromtimestamp( 81 timestamp_raw, tz=datetime.timezone.utc 82 ) 83 84 timestamps.append((index, timestamp)) 85 86 return timestamps
def
get_framerate(self, timestamps, log):
88 def get_framerate(self, timestamps, log): 89 90 # Calculate time differences (s) and current framerate (Hz) 91 time_differences = [] 92 timestamps = sorted(timestamps, key=lambda x: x[1]) 93 94 previous_time = None 95 for index, timestamp in timestamps: 96 if previous_time is not None: 97 time_difference = (timestamp - previous_time).total_seconds() 98 time_differences.append(time_difference) 99 previous_time = timestamp 100 101 # Statistics about timestamp distribution 102 average_time_diff = np.mean(time_differences) 103 median_time_diff = np.median(time_differences) 104 std_time_diff = np.std(time_differences) 105 min_time_diff = np.min(time_differences) 106 max_time_diff = np.max(time_differences) 107 range_time_diff = max_time_diff - min_time_diff 108 q1_time_diff = np.percentile(time_differences, 25) 109 q3_time_diff = np.percentile(time_differences, 75) 110 current_framerate_hz = 1 / median_time_diff # Median is more robust to outliers 111 112 log["time_s_avg_between_timestamps"] = average_time_diff 113 log["time_s_median_between_timestamps"] = median_time_diff 114 log["time_s_std_between_timestamps"] = std_time_diff 115 log["time_s_min_between_timestamps"] = min_time_diff 116 log["time_s_max_between_timestamps"] = max_time_diff 117 log["time_s_range_between_timestamps"] = range_time_diff 118 log["time_s_25_percentile_between_timestamps"] = q1_time_diff 119 log["time_s_75_percentile_between_timestamps"] = q3_time_diff 120 log["framerate_hz_original"] = current_framerate_hz 121 log["framerate_hz_target"] = self.target_framerate_hz 122 123 # Compute threshold 124 interquartile_range = q3_time_diff - q1_time_diff # Interquartile Range 125 upper_bound_threshold = ( 126 q3_time_diff + 1.5 * interquartile_range 127 ) # (1.5 * IQR rule) 128 log["upper_bound_threshold"] = upper_bound_threshold 129 130 return current_framerate_hz, timestamps, upper_bound_threshold
def
check_target_framerate(self, current_framerate_hz, log):
132 def check_target_framerate(self, current_framerate_hz, log): 133 # Check if target framerate is valid 134 if self.target_framerate_hz > current_framerate_hz: 135 print( 136 f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz" 137 ) 138 log["framerate_target_ok"] = False 139 return False 140 else: 141 log["framerate_target_ok"] = True 142 return True
def
sample_timestamps(self, timestamps, threshold_to_target, log):
144 def sample_timestamps(self, timestamps, threshold_to_target, log): 145 # Generate target timestamps 146 start_time = timestamps[0][1] 147 end_time = timestamps[-1][1] 148 target_timestamps = [] 149 current_time = start_time 150 while current_time <= end_time: 151 target_timestamps.append(current_time) 152 current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz) 153 154 # Find nearest original timestamps to target_timestamps_seconds 155 selected_indices = [] 156 selected_timestamps = [] 157 selected_target_timestamps = [] 158 159 for target in target_timestamps: 160 # Compute the time difference with each original timestamp 161 time_diffs = [(target - t).total_seconds() for i, t in timestamps] 162 time_diffs = np.abs(time_diffs) # Take absolute differences 163 164 # Find the index of the nearest timestamp 165 nearest_index = np.argmin(time_diffs) 166 167 # Ensure no duplicates are selected 168 if ( 169 time_diffs[nearest_index] <= threshold_to_target 170 and nearest_index not in selected_indices 171 ): 172 selected_target_timestamps.append(target) 173 selected_indices.append(nearest_index) 174 selected_timestamps.append(timestamps[nearest_index]) 175 176 # Log statistics 177 log["n_original_timestamps"] = len(timestamps) 178 log["n_target_timestamps"] = len(target_timestamps) 179 log["n_selected_timestamps"] = len(selected_timestamps) 180 181 if len(selected_timestamps) >=2: 182 # Compute new framerate 183 time_differences_new = [] 184 timestamps_new = sorted(selected_timestamps, key=lambda x: x[1]) 185 186 previous_time = None 187 for index, timestamp in timestamps_new: 188 if previous_time is not None: 189 time_difference = (timestamp - previous_time).total_seconds() 190 time_differences_new.append(time_difference) 191 previous_time = timestamp 192 median_time_diff_new = np.median(time_differences_new) 193 new_framerate_hz = 1 / median_time_diff_new 194 log["framerate_hz_sampled"] = new_framerate_hz 195 196 else: 197 print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}") 198 199 return ( 200 selected_indices, 201 selected_timestamps, 202 target_timestamps, 203 selected_target_timestamps, 204 )
def
update_upload_file(self, file_name, selected_indices, aws_source_path):
206 def update_upload_file(self, file_name, selected_indices, aws_source_path): 207 output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz" 208 lines = [] 209 with open(file_name, "r") as file: 210 for index, line in enumerate(file): 211 if index in selected_indices: 212 lines.append(line) 213 214 with open(output_file_path, "w") as output_file: 215 output_file.writelines(lines) 216 217 try: 218 head, tail = os.path.split(output_file.name) 219 file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024) 220 s3.upload_file( 221 output_file.name, 222 S3_BUCKET_NAME, 223 str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail, 224 ) 225 return file_size_mb 226 227 except Exception as e: 228 print( 229 "S3 upload failed for file " 230 + str(str(self.target_framerate_hz) + "/" + tail) 231 + " - " 232 + str(e) 233 ) 234 return None