diff --git a/pipelines/params/calibration_factors.yaml b/pipelines/params/calibration_factors.yaml new file mode 100644 index 0000000..fbcfdee --- /dev/null +++ b/pipelines/params/calibration_factors.yaml @@ -0,0 +1,22 @@ +standard : + num: { IE : 145.9, AB_ref_correct: 254000} + den: { IE_correct : 146.9, ABRefWave : 254001} + +#all_dat[, SO4_correct := (SO4_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)]; +SO4_11000 : + num: { IE : 145.9, AB_ref_correct: 254000, RIE_SO4 : 0.63} + den: { IE_correct : 146.9, ABRefWave : 254001, RIE_SO4_correct : 0.73} + +SO4_98_11000 : + num: { one : 1 } + den: { CE_annual_avg : 1, RIE_SO4_annual_avg : 1 } + +#all_dat[, NH4_correct := (NH4_11000 * IE * RIE_NH4 * AB_ref_correct) / (IE_correct * RIE_NH4_correct * ABRefWave)]; +NH4_11000 : + num: { IE : 145.9, AB_ref_correct: 254000, RIE_NH4 : 3.495} + den: { IE_correct : 146.9, ABRefWave : 254001, RIE_NH4_correct : 3.595} + +Org_44_110000 : + num: { one : 1} + den: { CE : 1, RIE_Org : 1.4 } + diff --git a/pipelines/params/diagnostic_variable_limits.yaml b/pipelines/params/diagnostic_variable_limits.yaml new file mode 100644 index 0000000..df38da9 --- /dev/null +++ b/pipelines/params/diagnostic_variable_limits.yaml @@ -0,0 +1,18 @@ +# Define limits for diagnostic variables (src: data///global_config.r) + +VaporizerTemp_C : + lower_lim : {value : 400, description : "heater"} + upper_lim : {value : 610, description : "heater"} + +ABsamp : + lower_lim : {value : 20000, description : "not specified yet"} + upper_lim : {value : 500000, description : "not specified yet"} + +FlowRate_ccs : + lower_lim : {value : 1.23, description : "not specified yet"} + upper_lim : {value : 1.45, description : "not specified yet"} + +FilamentEmission_mA : + lower_lim : {value : 0.65, description : "not specified yet"} + upper_lim : {value : 1.5, description : "not specified yet"} + diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py new file mode 100644 index 0000000..c4b1b41 --- /dev/null +++ b/pipelines/steps/apply_calibration_factors.py @@ -0,0 +1,167 @@ + +import sys, os + +try: + thisFilePath = os.path.abspath(__file__) + print(thisFilePath) +except NameError: + print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") + print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") + #print("Otherwise, path to submodule DIMA may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + +dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root +projectPath = os.path.normpath(os.path.join(dimaPath,'..')) +print(dimaPath) +import numpy as np +import pandas as pd +from math import prod # To replace multiplyall +import argparse +import yaml + +# Set up project root directory +#root_dir = os.path.abspath(os.curdir) +#sys.path.append(root_dir) +sys.path.append(dimaPath) + +import dima.src.hdf5_ops as dataOps + + + +def apply_calibration_factors(data_table, calibration_factors): + """ + Calibrates the species data in the given data table using a calibration factor. + + Parameters: + data_table (pd.DataFrame): The input data table with variables to calibrate. + calibration_factor (dict): Dictionary containing 'standard' calibration factors + with 'num' and 'den' values as dictionaries of multipliers. + + Returns: + pd.DataFrame: A new data table with calibrated variables. + """ + # Make a copy of the input table to avoid modifying the original + new_data_table = data_table.copy() + + # Initialize a dictionary to rename variables + variable_rename_dict = {} + + # Loop through the column names in the data table + for variable_name in new_data_table.select_dtypes(include=["number"]).columns: + + if not variable_name in calibration_factors.keys(): # use standard calibration factor + # Extract numerator and denominator values + numerator = prod(value for key, value in calibration_factors['standard']['num'].items()) + denominator = prod(value for key, value in calibration_factors['standard']['den'].items()) + + else: # use specifies dependent calibration factor + + #print(variable_name) + #print([key for key in calibration_factors[variable_name]]) + numerator = prod(value for key, value in calibration_factors[variable_name]['num'].items()) + denominator = prod(value for key, value in calibration_factors[variable_name]['den'].items()) + + # Apply calibration to each variable + new_data_table[variable_name] = new_data_table[variable_name].mul((numerator / denominator)) + + # Add renaming entry + variable_rename_dict[variable_name] = f"{variable_name}_correct" + + # Rename the columns in the new data table + new_data_table.rename(columns=variable_rename_dict, inplace=True) + + return new_data_table + +if __name__ == '__main__': + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") + parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") + parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") + parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + + args = parser.parse_args() + + # Load input data and calibration factors + try: + #data_table = pd.read_json(args.data_file) + + print(args.data_file) + + dataManager = dataOps.HDF5DataOpsManager(args.data_file) + dataManager.load_file_obj() + dataset_name = '/'+args.dataset_name + data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) + + dataManager.extract_and_load_dataset_metadata() + dataset_metadata_df = dataManager.dataset_metadata_df.copy() + print(dataset_metadata_df.head()) + + dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] + data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] + parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] + parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + + dataManager.unload_file_obj() + print(args.calibration_file) + + with open(args.calibration_file, 'r') as stream: + calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + except Exception as e: + print(f"Error loading input files: {e}") + exit(1) + + path_to_output_dir, ext = os.path.splitext(args.data_file) + + print('Path to output directory :', path_to_output_dir) + + + + # Perform calibration + try: + + processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) + + print(processingScriptRelPath) + + metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} + + path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) + path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv']) + + path_tail, path_head = os.path.split(path_to_calibrated_file) + path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) + + print('Path to output file :', path_to_calibrated_file) + import dima.utils.g5505_utils as utils + import json + calibrated_table = apply_calibration_factors(data_table, calibration_factors) + metadata['processing_date'] = utils.created_at() + calibrated_table.to_csv(path_to_calibrated_file, index=False) + + # Ensure the file exists + if not os.path.exists(path_to_metadata_file): + with open(path_to_metadata_file, 'w') as f: + json.dump({}, f) # Initialize empty JSON + + # Read the existing JSON + with open(path_to_metadata_file, 'r') as metadata_file: + try: + json_dict = json.load(metadata_file) + except json.JSONDecodeError: + json_dict = {} # Start fresh if file is invalid + + # Update the JSON object + outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') + json_dict[outputfileRelPath] = metadata + + # Write updated JSON back to the file + with open(path_to_metadata_file, 'w') as metadata_file: + json.dump(json_dict, metadata_file, indent=4) + + print(f"Calibrated data saved to {path_to_calibrated_file}") + print(f"Metadata for calibrated data saved to {path_to_metadata_file}") + except Exception as e: + print(f"Error during calibration: {e}") + exit(1) \ No newline at end of file diff --git a/pipelines/steps/create_flags_for_diagnostic_vars.py b/pipelines/steps/create_flags_for_diagnostic_vars.py new file mode 100644 index 0000000..d9a5c37 --- /dev/null +++ b/pipelines/steps/create_flags_for_diagnostic_vars.py @@ -0,0 +1,167 @@ +import sys, os + +try: + thisFilePath = os.path.abspath(__file__) + print(thisFilePath) +except NameError: + print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") + print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") + #print("Otherwise, path to submodule DIMA may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + +import numpy as np +import pandas as pd +import argparse + +import yaml, json + +dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root +projectPath = os.path.normpath(os.path.join(dimaPath,'..')) +print(dimaPath) +sys.path.append(dimaPath) +import dima.src.hdf5_ops as dataOps + + +def create_flags_for_diagnostic_vars(data_table, variable_limits): + """ + Create indicator variables that check whether a particular diagnostic variable is within + pre-specified/acceptable limits, which are defined by `variable_limits`. + + Parameters: + data_table (pd.DataFrame): The input data table with variables to calibrate. + variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g., + { + 'ABsamp': { + 'lower_lim': {'value': 20000, 'description': "not specified yet"}, + 'upper_lim': {'value': 500000, 'description': "not specified yet"} + } + } + + Returns: + pd.DataFrame: A new data table with calibrated variables, containing the original columns + and additional indicator variables, representing flags. + """ + + # Initialize a dictionary to store indicator variables + indicator_variables = {} + + # Loop through the column names in the data table + for diagnostic_variable in data_table.columns: + + # Skip if the diagnostic variable is not in variable_limits + if diagnostic_variable not in variable_limits: + print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {variable_limits}.') + continue + + # Get lower and upper limits for diagnostic_variable from variable limits dict + lower_lim = variable_limits[diagnostic_variable]['lower_lim']['value'] + upper_lim = variable_limits[diagnostic_variable]['upper_lim']['value'] + + # Create an indicator variable for the current diagnostic variable + tmp = data_table[diagnostic_variable] + indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy() + + # Add indicator variables to the new data table + new_data_table = pd.DataFrame(indicator_variables) + + return new_data_table + + +# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"] +# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"] +# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] +# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] + +if __name__ == '__main__': + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") + parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") + parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") + #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + + args = parser.parse_args() + + # Load input data and calibration factors + try: + #data_table = pd.read_json(args.data_file) + + print(args.data_file) + + dataManager = dataOps.HDF5DataOpsManager(args.data_file) + dataManager.load_file_obj() + dataset_name = '/'+args.dataset_name + data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) + + dataManager.extract_and_load_dataset_metadata() + dataset_metadata_df = dataManager.dataset_metadata_df.copy() + print(dataset_metadata_df.head()) + + dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] + data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] + parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] + parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + + dataManager.unload_file_obj() + print(args.calibration_file) + + with open(args.calibration_file, 'r') as stream: + calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + except Exception as e: + print(f"Error loading input files: {e}") + exit(1) + + path_to_output_dir, ext = os.path.splitext(args.data_file) + + print('Path to output directory :', path_to_output_dir) + + + + # Perform calibration + try: + + processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) + + print(processingScriptRelPath) + + metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} + + path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) + path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv']) + + path_tail, path_head = os.path.split(path_to_calibrated_file) + path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) + + print('Path to output file :', path_to_calibrated_file) + import dima.utils.g5505_utils as utils + import json + calibrated_table = create_flags_for_diagnostic_vars(data_table, calibration_factors) + metadata['processing_date'] = utils.created_at() + calibrated_table.to_csv(path_to_calibrated_file, index=False) + + # Ensure the file exists + if not os.path.exists(path_to_metadata_file): + with open(path_to_metadata_file, 'w') as f: + json.dump({}, f) # Initialize empty JSON + + # Read the existing JSON + with open(path_to_metadata_file, 'r') as metadata_file: + try: + json_dict = json.load(metadata_file) + except json.JSONDecodeError: + json_dict = {} # Start fresh if file is invalid + + # Update the JSON object + outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') + json_dict[outputfileRelPath] = metadata + + # Write updated JSON back to the file + with open(path_to_metadata_file, 'w') as metadata_file: + json.dump(json_dict, metadata_file, indent=4) + + print(f"Calibrated data saved to {path_to_calibrated_file}") + print(f"Metadata for calibrated data saved to {path_to_metadata_file}") + except Exception as e: + print(f"Error during calibration: {e}") + exit(1) \ No newline at end of file