From b643eb2d268af549b5f81c98bdd8bad9b65195fe Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Thu, 13 Mar 2025 14:17:46 +0100 Subject: [PATCH] Refactored command line interface, generate_species_flags, and created functions to improve performance and code clarity. --- pipelines/steps/generate_flags.py | 157 +++++++++++++++--------------- 1 file changed, 78 insertions(+), 79 deletions(-) diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 02341c2..b8ae31f 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -101,8 +101,8 @@ def generate_diagnostic_flags(data_table, validity_thresholds_dict): #) return new_data_table -# TODO: abstract some of the code in the command line main -def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict): + +def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, flagsFolderPath, datetime_var : str = 't_start_Buf'): """Generate flags for columns in data_table based on flags_table @@ -121,37 +121,30 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict): print('Predefined_species:', predefined_species) - # Save output tables to csv file and save/or update data lineage record - filename, ext = os.path.splitext(parent_file) - path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) - variables_set = set(data_table.columns) print(variables_set) - manual_json_flags = [] - csv_flags = [] - # Inspect flags folder - for filename in os.listdir(path_to_output_folder): - if all([filename.endswith('.json'), 'metadata' not in filename, 'flag' in filename]): - manual_json_flags.append(filename) - elif filename.endswith('.csv'): - csv_flags.append(filename) - + manual_json_flags, csv_flags = get_flags_from_folder(flagsFolderPath) + + print(manual_json_flags,csv_flags) + if csv_flags: - flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0])) + flags_table = pd.read_csv(os.path.join(flagsFolderPath, csv_flags[0])) if 'numflag_any_diagnostic_flag' in flags_table.columns: - #renaming_map = {var: f'flag_{var}' for var in data_table.columns} - #data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values - #data_table.rename(columns=renaming_map, inplace=True) - renaming_map = {} - for var in data_table.columns: - #print(var) - if (not datetime_var == var) and (var in predefined_species): - renaming_map[var] = f'numflag_{var}' - print(f'numflag_{var}') - data_table[var] = pd.Series(flags_table['numflag_any_diagnostic_flag'].values,dtype=np.int64) + # Define condition for renaming + required = lambda var: var != datetime_var and var in predefined_species + + # Create renaming map using dictionary comprehension + renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)} + + # Assign the same flag values across required columns using vectorized operation + variables = list(renaming_map.keys()) + data_table[variables] = np.tile( + flags_table['numflag_any_diagnostic_flag'].values[:, None], + (1, len(variables)) + ) print(renaming_map) data_table.rename(columns=renaming_map, inplace=True) else: @@ -169,7 +162,7 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict): #if f'flag_{varname}' in data_table.columns: try: # Load manually generate flag - with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream: + with open(os.path.join(flagsFolderPath, flag_filename), 'r') as stream: flag_dict = json.load(stream) t1 = pd.to_datetime(flag_dict.get('startdate')) @@ -192,18 +185,6 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict): # print(col) data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns) - - # Apply the ranking logic efficiently - #data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].map( - # lambda x: reconcile_flags(x, flag_code) - #) - - - #if 456 <= flag_code <= 800: - # data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].applymap(lambda x: max(x, flag_code)) - #else: - # data_table.loc[t1_idx:t2_idx, numflag_columns] = flag_code - except (KeyError, ValueError, FileNotFoundError) as e: print(f"Error processing {flag_filename}: {e}") continue @@ -247,28 +228,8 @@ def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): # all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] # all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] -if __name__ == '__main__': - - # Set up argument parsing - parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.") - - parser.add_argument( - "--flag-type", - required=True, - choices=["diagnostics", "species", "cpd"], - help="Specify the flag type. Must be one of: diagnostics, species, cpd" - ) - parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") - #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') - #parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.") - #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") - - args = parser.parse_args() - - flag_type = args.flag_type - data_file = args.data_file - - # Load input data and calibration factors +def main(data_file, flag_type): + # Open data file and load dataset associated with flag_type : either diagnostics or species try: dataManager = dataOps.HDF5DataOpsManager(args.data_file) dataManager.load_file_obj() @@ -292,16 +253,10 @@ if __name__ == '__main__': if flag_type == 'species': keywords = ['ACSM_JFJ_','_timeseries.txt/data_table'] find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] - #dataset_name = dataset_metadata_df['dataset_name'][find_keyword] - #parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'] - #parent_flag_file = '_'.join([os.path.splitext(parent_file),'flags.csv']) - #parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'] - - dataset_name = dataset_metadata_df['dataset_name'][find_keyword] - parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'] - #parent_flag_file = '_'.join([os.path.splitext(parent_file)[0],'flags.csv']) # Expected name - parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'] + # Specify source dataset to be extracted from input hdf5 data file + columns = ['dataset_name','parent_file','parent_instrument'] + dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns) print(':)') if not (dataset_name.size == 1): raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}') @@ -313,6 +268,7 @@ if __name__ == '__main__': data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name) + # Count the number of NaT (null) values num_nats = data_table[datetime_var].isna().sum() # Get the total number of rows @@ -325,20 +281,18 @@ if __name__ == '__main__': dataManager.unload_file_obj() - except Exception as e: + except Exception as e: print(f"Error loading input files: {e}") exit(1) - - path_to_output_dir, ext = os.path.splitext(args.data_file) - - print('Path to output directory :', path_to_output_dir) + finally: + dataManager.unload_file_obj() - # Perform calibration - - flag_type = args.flag_type + print('Starting flag generation.') try: + path_to_output_dir, ext = os.path.splitext(data_file) + print('Path to output directory :', path_to_output_dir) # Define output directory of apply_calibration_factors() step suffix = 'flags' if len(parent_instrument.split('/')) >= 2: @@ -367,7 +321,7 @@ if __name__ == '__main__': if flag_type == 'species': #calib_param_dict = load_parameters(flag_type) calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml") - flags_table = generate_species_flags(data_table,calib_param_dict) + flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var) metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/'), @@ -396,3 +350,48 @@ if __name__ == '__main__': print(f"Error during calibration: {e}") exit(1) + +def get_flags_from_folder(flagsFolderPath): + # Get current state of flags folder, which will condition the species flagging + manual_json_flags = [] + csv_flags = [] + + # Loop through all files in the flags folder + for folderitem in os.listdir(flagsFolderPath): + + # Skip system-level metadata JSON file + if all([folderitem.endswith('.json'), 'metadata' in folderitem]): + continue + + # Identify manual flag JSON files with 'flag__.json' format + if folderitem.startswith('flag') and folderitem.endswith('.json'): + manual_json_flags.append(folderitem) + # Identify CSV flag files + elif folderitem.endswith('.csv'): + csv_flags.append(folderitem) + + # Return the lists of manual flag JSON files and CSV flag files + return manual_json_flags, csv_flags + +if __name__ == '__main__': + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.") + + parser.add_argument( + "--flag-type", + required=True, + choices=["diagnostics", "species", "cpd"], + help="Specify the flag type. Must be one of: diagnostics, species, cpd" + ) + parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") + #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + #parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.") + #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + + args = parser.parse_args() + + flag_type = args.flag_type + data_file = args.data_file + + main(data_file, flag_type) \ No newline at end of file