utils.selector
1import logging 2 3from fiftyone import ViewField as F 4 5from config.config import GLOBAL_SEED 6 7 8def select_random(dataset, n_samples): 9 """Select a random subset of samples from the dataset.""" 10 11 random_view = dataset.take(n_samples, seed=GLOBAL_SEED) 12 return random_view 13 14 15def select_by_class(dataset, classes_in=[], classes_out=[]): 16 """Filters a dataset based on inclusion and exclusion of specified classes.""" 17 incl_conditions = None 18 excl_conditions = None 19 20 if classes_in: 21 if isinstance(classes_in, str): 22 classes_in = [classes_in] 23 for class_in in classes_in: 24 condition = F("ground_truth.detections.label").contains(class_in) 25 incl_conditions = ( 26 condition if incl_conditions is None else incl_conditions | condition 27 ) 28 29 if classes_out: 30 if isinstance(classes_out, str): 31 classes_out = [classes_out] 32 for class_out in classes_out: 33 condition = ~F("ground_truth.detections.label").contains(class_out) 34 excl_conditions = ( 35 condition if excl_conditions is None else excl_conditions & condition 36 ) 37 38 if incl_conditions is not None and excl_conditions is not None: 39 conditions = incl_conditions & excl_conditions 40 elif incl_conditions is not None: 41 conditions = incl_conditions 42 elif excl_conditions is not None: 43 conditions = excl_conditions 44 else: 45 conditions = None 46 47 view = dataset.match(conditions) if conditions is not None else dataset 48 return view 49 50 51def generate_view_embedding_selection( 52 dataset, 53 configuration, 54 embedding_selection_field="embedding_selection", 55 embedding_count_field="embedding_selection_count", 56): 57 """Returns filtered subset of dataset where embedding_count_field is greater than or equal to min_selection_count threshold.""" 58 n_samples_in = len(dataset) 59 min_selection_count = configuration["min_selection_count"] 60 view_selection_count = dataset.match( 61 F(embedding_count_field) >= min_selection_count 62 ) 63 64 n_samples_out = len(view_selection_count) 65 logging.info( 66 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Embedding Selection'" 67 ) 68 return view_selection_count 69 70 71def generate_view_anomaly_detection_selection( 72 dataset, configuration, field_anomaly_score_root="pred_anomaly_score_" 73): 74 """Filters dataset based on anomaly scores using a configured threshold.""" 75 n_samples_in = len(dataset) 76 model_name = configuration["model"] 77 min_anomaly_score = configuration["min_anomaly_score"] 78 field_name_anomaly_score = field_anomaly_score_root + model_name 79 view_anomaly_score = dataset.match(F(field_name_anomaly_score) >= min_anomaly_score) 80 81 n_samples_out = len(view_anomaly_score) 82 logging.info( 83 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Anomaly Detection'" 84 ) 85 return view_anomaly_score 86 87 88def generate_view_ensemble_selection( 89 dataset, 90 configuration, 91 ensemble_selection_field="n_unique_ensemble_selection", 92 ensemble_selection_tag="detections_overlap", 93): 94 """Filters dataset to samples with minimum unique selections and specified tag.""" 95 96 n_samples_in = len(dataset) 97 min_n_unique_selection = configuration["min_n_unique_selection"] 98 view_n_unique_exploration = dataset.match( 99 F(ensemble_selection_field) >= min_n_unique_selection 100 ) 101 view_tagged_labels = view_n_unique_exploration.select_labels( 102 tags=ensemble_selection_tag 103 ) 104 105 n_samples_out = len(view_tagged_labels) 106 logging.info( 107 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Ensemble Selection'" 108 ) 109 110 return view_tagged_labels
def
select_random(dataset, n_samples):
9def select_random(dataset, n_samples): 10 """Select a random subset of samples from the dataset.""" 11 12 random_view = dataset.take(n_samples, seed=GLOBAL_SEED) 13 return random_view
Select a random subset of samples from the dataset.
def
select_by_class(dataset, classes_in=[], classes_out=[]):
16def select_by_class(dataset, classes_in=[], classes_out=[]): 17 """Filters a dataset based on inclusion and exclusion of specified classes.""" 18 incl_conditions = None 19 excl_conditions = None 20 21 if classes_in: 22 if isinstance(classes_in, str): 23 classes_in = [classes_in] 24 for class_in in classes_in: 25 condition = F("ground_truth.detections.label").contains(class_in) 26 incl_conditions = ( 27 condition if incl_conditions is None else incl_conditions | condition 28 ) 29 30 if classes_out: 31 if isinstance(classes_out, str): 32 classes_out = [classes_out] 33 for class_out in classes_out: 34 condition = ~F("ground_truth.detections.label").contains(class_out) 35 excl_conditions = ( 36 condition if excl_conditions is None else excl_conditions & condition 37 ) 38 39 if incl_conditions is not None and excl_conditions is not None: 40 conditions = incl_conditions & excl_conditions 41 elif incl_conditions is not None: 42 conditions = incl_conditions 43 elif excl_conditions is not None: 44 conditions = excl_conditions 45 else: 46 conditions = None 47 48 view = dataset.match(conditions) if conditions is not None else dataset 49 return view
Filters a dataset based on inclusion and exclusion of specified classes.
def
generate_view_embedding_selection( dataset, configuration, embedding_selection_field='embedding_selection', embedding_count_field='embedding_selection_count'):
52def generate_view_embedding_selection( 53 dataset, 54 configuration, 55 embedding_selection_field="embedding_selection", 56 embedding_count_field="embedding_selection_count", 57): 58 """Returns filtered subset of dataset where embedding_count_field is greater than or equal to min_selection_count threshold.""" 59 n_samples_in = len(dataset) 60 min_selection_count = configuration["min_selection_count"] 61 view_selection_count = dataset.match( 62 F(embedding_count_field) >= min_selection_count 63 ) 64 65 n_samples_out = len(view_selection_count) 66 logging.info( 67 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Embedding Selection'" 68 ) 69 return view_selection_count
Returns filtered subset of dataset where embedding_count_field is greater than or equal to min_selection_count threshold.
def
generate_view_anomaly_detection_selection( dataset, configuration, field_anomaly_score_root='pred_anomaly_score_'):
72def generate_view_anomaly_detection_selection( 73 dataset, configuration, field_anomaly_score_root="pred_anomaly_score_" 74): 75 """Filters dataset based on anomaly scores using a configured threshold.""" 76 n_samples_in = len(dataset) 77 model_name = configuration["model"] 78 min_anomaly_score = configuration["min_anomaly_score"] 79 field_name_anomaly_score = field_anomaly_score_root + model_name 80 view_anomaly_score = dataset.match(F(field_name_anomaly_score) >= min_anomaly_score) 81 82 n_samples_out = len(view_anomaly_score) 83 logging.info( 84 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Anomaly Detection'" 85 ) 86 return view_anomaly_score
Filters dataset based on anomaly scores using a configured threshold.
def
generate_view_ensemble_selection( dataset, configuration, ensemble_selection_field='n_unique_ensemble_selection', ensemble_selection_tag='detections_overlap'):
89def generate_view_ensemble_selection( 90 dataset, 91 configuration, 92 ensemble_selection_field="n_unique_ensemble_selection", 93 ensemble_selection_tag="detections_overlap", 94): 95 """Filters dataset to samples with minimum unique selections and specified tag.""" 96 97 n_samples_in = len(dataset) 98 min_n_unique_selection = configuration["min_n_unique_selection"] 99 view_n_unique_exploration = dataset.match( 100 F(ensemble_selection_field) >= min_n_unique_selection 101 ) 102 view_tagged_labels = view_n_unique_exploration.select_labels( 103 tags=ensemble_selection_tag 104 ) 105 106 n_samples_out = len(view_tagged_labels) 107 logging.info( 108 f"Sample Reduction: {n_samples_in} -> {n_samples_out}. Workflow 'Ensemble Selection'" 109 ) 110 111 return view_tagged_labels
Filters dataset to samples with minimum unique selections and specified tag.