From 2c2b154528cd38c94f0776b306de938546314ef3 Mon Sep 17 00:00:00 2001 From: Juan Felipe Florez Ospina Date: Wed, 5 Feb 2025 18:15:43 +0100 Subject: [PATCH] Update to account for yaml file attribute renamings. --- .../steps/create_flags_for_diagnostic_vars.py | 334 +++++++++--------- 1 file changed, 168 insertions(+), 166 deletions(-) diff --git a/pipelines/steps/create_flags_for_diagnostic_vars.py b/pipelines/steps/create_flags_for_diagnostic_vars.py index d9a5c37..0dd81cc 100644 --- a/pipelines/steps/create_flags_for_diagnostic_vars.py +++ b/pipelines/steps/create_flags_for_diagnostic_vars.py @@ -1,167 +1,169 @@ -import sys, os - -try: - thisFilePath = os.path.abspath(__file__) - print(thisFilePath) -except NameError: - print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") - print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") - #print("Otherwise, path to submodule DIMA may not be resolved properly.") - thisFilePath = os.getcwd() # Use current directory or specify a default - -import numpy as np -import pandas as pd -import argparse - -import yaml, json - -dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root -projectPath = os.path.normpath(os.path.join(dimaPath,'..')) -print(dimaPath) -sys.path.append(dimaPath) -import dima.src.hdf5_ops as dataOps - - -def create_flags_for_diagnostic_vars(data_table, variable_limits): - """ - Create indicator variables that check whether a particular diagnostic variable is within - pre-specified/acceptable limits, which are defined by `variable_limits`. - - Parameters: - data_table (pd.DataFrame): The input data table with variables to calibrate. - variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g., - { - 'ABsamp': { - 'lower_lim': {'value': 20000, 'description': "not specified yet"}, - 'upper_lim': {'value': 500000, 'description': "not specified yet"} - } - } - - Returns: - pd.DataFrame: A new data table with calibrated variables, containing the original columns - and additional indicator variables, representing flags. - """ - - # Initialize a dictionary to store indicator variables - indicator_variables = {} - - # Loop through the column names in the data table - for diagnostic_variable in data_table.columns: - - # Skip if the diagnostic variable is not in variable_limits - if diagnostic_variable not in variable_limits: - print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {variable_limits}.') - continue - - # Get lower and upper limits for diagnostic_variable from variable limits dict - lower_lim = variable_limits[diagnostic_variable]['lower_lim']['value'] - upper_lim = variable_limits[diagnostic_variable]['upper_lim']['value'] - - # Create an indicator variable for the current diagnostic variable - tmp = data_table[diagnostic_variable] - indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy() - - # Add indicator variables to the new data table - new_data_table = pd.DataFrame(indicator_variables) - - return new_data_table - - -# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"] -# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"] -# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] -# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] - -if __name__ == '__main__': - - # Set up argument parsing - parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") - parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") - parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') - parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") - #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") - - args = parser.parse_args() - - # Load input data and calibration factors - try: - #data_table = pd.read_json(args.data_file) - - print(args.data_file) - - dataManager = dataOps.HDF5DataOpsManager(args.data_file) - dataManager.load_file_obj() - dataset_name = '/'+args.dataset_name - data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) - - dataManager.extract_and_load_dataset_metadata() - dataset_metadata_df = dataManager.dataset_metadata_df.copy() - print(dataset_metadata_df.head()) - - dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] - data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] - parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] - parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] - - dataManager.unload_file_obj() - print(args.calibration_file) - - with open(args.calibration_file, 'r') as stream: - calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) - except Exception as e: - print(f"Error loading input files: {e}") - exit(1) - - path_to_output_dir, ext = os.path.splitext(args.data_file) - - print('Path to output directory :', path_to_output_dir) - - - - # Perform calibration - try: - - processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) - - print(processingScriptRelPath) - - metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} - - path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) - path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv']) - - path_tail, path_head = os.path.split(path_to_calibrated_file) - path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) - - print('Path to output file :', path_to_calibrated_file) - import dima.utils.g5505_utils as utils - import json - calibrated_table = create_flags_for_diagnostic_vars(data_table, calibration_factors) - metadata['processing_date'] = utils.created_at() - calibrated_table.to_csv(path_to_calibrated_file, index=False) - - # Ensure the file exists - if not os.path.exists(path_to_metadata_file): - with open(path_to_metadata_file, 'w') as f: - json.dump({}, f) # Initialize empty JSON - - # Read the existing JSON - with open(path_to_metadata_file, 'r') as metadata_file: - try: - json_dict = json.load(metadata_file) - except json.JSONDecodeError: - json_dict = {} # Start fresh if file is invalid - - # Update the JSON object - outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') - json_dict[outputfileRelPath] = metadata - - # Write updated JSON back to the file - with open(path_to_metadata_file, 'w') as metadata_file: - json.dump(json_dict, metadata_file, indent=4) - - print(f"Calibrated data saved to {path_to_calibrated_file}") - print(f"Metadata for calibrated data saved to {path_to_metadata_file}") - except Exception as e: - print(f"Error during calibration: {e}") +import sys, os + +try: + thisFilePath = os.path.abspath(__file__) + print(thisFilePath) +except NameError: + print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") + print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") + #print("Otherwise, path to submodule DIMA may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + +import numpy as np +import pandas as pd +import argparse + +import yaml, json + +dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root +projectPath = os.path.normpath(os.path.join(dimaPath,'..')) +print(dimaPath) +sys.path.append(dimaPath) +import dima.src.hdf5_ops as dataOps + + +def create_flags_for_diagnostic_vars(data_table, validity_thresholds_dict): + """ + Create indicator variables that check whether a particular diagnostic variable is within + pre-specified/acceptable limits, which are defined by `variable_limits`. + + Parameters: + data_table (pd.DataFrame): The input data table with variables to calibrate. + variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g., + { + 'ABsamp': { + 'lower_lim': {'value': 20000, 'description': "not specified yet"}, + 'upper_lim': {'value': 500000, 'description': "not specified yet"} + } + } + + Returns: + pd.DataFrame: A new data table with calibrated variables, containing the original columns + and additional indicator variables, representing flags. + """ + + # Initialize a dictionary to store indicator variables + indicator_variables = {} + + # Loop through the column names in the data table + for diagnostic_variable in data_table.columns: + print(diagnostic_variable) + # Skip if the diagnostic variable is not in variable_limits + if diagnostic_variable not in validity_thresholds_dict['validity_thresholds']['variables']: + print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {validity_thresholds_dict}.') + continue + + # Get lower and upper limits for diagnostic_variable from variable limits dict + variable_ranges = validity_thresholds_dict['validity_thresholds']['variables'][diagnostic_variable] + lower_lim = variable_ranges['lower_lim'] + upper_lim = variable_ranges['upper_lim'] + + # Create an indicator variable for the current diagnostic variable + tmp = data_table[diagnostic_variable] + indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy() + + # Add indicator variables to the new data table + new_data_table = pd.DataFrame(indicator_variables) + + return new_data_table + + +# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"] +# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"] +# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] +# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] + +if __name__ == '__main__': + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") + parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") + parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") + #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + + args = parser.parse_args() + + # Load input data and calibration factors + try: + #data_table = pd.read_json(args.data_file) + + print(args.data_file) + + dataManager = dataOps.HDF5DataOpsManager(args.data_file) + dataManager.load_file_obj() + dataset_name = '/'+args.dataset_name + data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) + + dataManager.extract_and_load_dataset_metadata() + dataset_metadata_df = dataManager.dataset_metadata_df.copy() + print(dataset_metadata_df.head()) + + dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] + data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] + parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] + parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + + dataManager.unload_file_obj() + print(args.calibration_file) + + with open(args.calibration_file, 'r') as stream: + calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + except Exception as e: + print(f"Error loading input files: {e}") + exit(1) + + path_to_output_dir, ext = os.path.splitext(args.data_file) + + print('Path to output directory :', path_to_output_dir) + + + + # Perform calibration + try: + + processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) + + print(processingScriptRelPath) + + metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} + + path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) + path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv']) + + path_tail, path_head = os.path.split(path_to_calibrated_file) + path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) + + print('Path to output file :', path_to_calibrated_file) + import dima.utils.g5505_utils as utils + import json + print(calibration_factors.keys()) + calibrated_table = create_flags_for_diagnostic_vars(data_table, calibration_factors) + metadata['processing_date'] = utils.created_at() + calibrated_table.to_csv(path_to_calibrated_file, index=False) + + # Ensure the file exists + if not os.path.exists(path_to_metadata_file): + with open(path_to_metadata_file, 'w') as f: + json.dump({}, f) # Initialize empty JSON + + # Read the existing JSON + with open(path_to_metadata_file, 'r') as metadata_file: + try: + json_dict = json.load(metadata_file) + except json.JSONDecodeError: + json_dict = {} # Start fresh if file is invalid + + # Update the JSON object + outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') + json_dict[outputfileRelPath] = metadata + + # Write updated JSON back to the file + with open(path_to_metadata_file, 'w') as metadata_file: + json.dump(json_dict, metadata_file, indent=4) + + print(f"Calibrated data saved to {path_to_calibrated_file}") + print(f"Metadata for calibrated data saved to {path_to_metadata_file}") + except Exception as e: + print(f"Error during calibration: {e}") exit(1) \ No newline at end of file