diff --git a/pipelines/params/calibration_factors.yaml b/pipelines/params/calibration_factors.yaml index fbcfdee..0718184 100644 --- a/pipelines/params/calibration_factors.yaml +++ b/pipelines/params/calibration_factors.yaml @@ -1,22 +1,102 @@ -standard : - num: { IE : 145.9, AB_ref_correct: 254000} - den: { IE_correct : 146.9, ABRefWave : 254001} +# Define common factors +factors: &factors + # Get values from data///config_acsm_.r, values used in Tofware analysis. + IE: 145.9 + ABRefWave: 254000 # TODO: verify if AB_ref in the config file is the same as ABRefWave + RIE_SO4: 0.63 + RIE_NH4: 3.495 + RIE_Org : 1.4 + # Get values from data///cal.csv + IE_correct: 145.9 + AB_ref_correct: 254000 + RIE_SO4_correct: 0.63 + RIE_NH4_correct: 3.495 + RIE_Org_correct : 1.4 + flow_ref_correct : 1.36 + +# Define mappings for associated variables +variables: + # all_dat[, NO3_correct := (NO3_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + NO3_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, SO4_correct := (SO4_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + SO4_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + # all_dat[, NH4_correct := (NH4_11000 * IE * RIE_NH4 * AB_ref_correct) / (IE_correct * RIE_NH4_correct * ABRefWave)] + NH4_11000: + num: [*factors.IE, *factors.RIE_NH4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_NH4_correct, *factors.ABRefWave] + + # all_dat[, Org_correct := (Org_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + Org_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, Chl_correct := (Chl_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + Chl_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, Org_44_11000_correct := (Org_44_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + Org_44_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, Org_43_11000_correct := (Org_43_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + Org_43_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, Org_60_11000_correct := (Org_60_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + + Org_60_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, NO3_30_11000_correct := (NO3_30_11000 * IE * AB_ref_correct) / (IE_correct * ABRefWave)] + + NO3_30_11000: + num: [*factors.IE, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.ABRefWave] + + # all_dat[, SO4_98_11000_correct := (SO4_98_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + + SO4_98_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + # all_dat[, SO4_81_11000_correct := (SO4_81_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + + SO4_81_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + # all_dat[, SO4_82_11000_correct := (SO4_82_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + + SO4_82_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + # all_dat[, SO4_62_11000_correct := (SO4_62_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + + SO4_62_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + # all_dat[, SO4_48_11000_correct := (SO4_48_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)] + + SO4_48_11000: + num: [*factors.IE, *factors.RIE_SO4, *factors.AB_ref_correct] + den: [*factors.IE_correct, *factors.RIE_SO4_correct, *factors.ABRefWave] + + + -#all_dat[, SO4_correct := (SO4_11000 * IE * RIE_SO4 * AB_ref_correct) / (IE_correct * RIE_SO4_correct * ABRefWave)]; -SO4_11000 : - num: { IE : 145.9, AB_ref_correct: 254000, RIE_SO4 : 0.63} - den: { IE_correct : 146.9, ABRefWave : 254001, RIE_SO4_correct : 0.73} -SO4_98_11000 : - num: { one : 1 } - den: { CE_annual_avg : 1, RIE_SO4_annual_avg : 1 } -#all_dat[, NH4_correct := (NH4_11000 * IE * RIE_NH4 * AB_ref_correct) / (IE_correct * RIE_NH4_correct * ABRefWave)]; -NH4_11000 : - num: { IE : 145.9, AB_ref_correct: 254000, RIE_NH4 : 3.495} - den: { IE_correct : 146.9, ABRefWave : 254001, RIE_NH4_correct : 3.595} -Org_44_110000 : - num: { one : 1} - den: { CE : 1, RIE_Org : 1.4 } diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index c4b1b41..7532034 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -49,29 +49,57 @@ def apply_calibration_factors(data_table, calibration_factors): # Loop through the column names in the data table for variable_name in new_data_table.select_dtypes(include=["number"]).columns: - if not variable_name in calibration_factors.keys(): # use standard calibration factor + if variable_name in calibration_factors['variables'].keys(): # use standard calibration factor # Extract numerator and denominator values - numerator = prod(value for key, value in calibration_factors['standard']['num'].items()) - denominator = prod(value for key, value in calibration_factors['standard']['den'].items()) + numerator = prod(calibration_factors[variable_name]['num']) + denominator = prod(calibration_factors[variable_name]['den']) + + # Apply calibration to each variable + new_data_table[variable_name] = new_data_table[variable_name].mul((numerator / denominator)) + + # Add renaming entry + variable_rename_dict[variable_name] = f"{variable_name}_correct" else: # use specifies dependent calibration factor + print(f'There is no calibration factors for variable {variable_name}. The variable will remain the same.') - #print(variable_name) - #print([key for key in calibration_factors[variable_name]]) - numerator = prod(value for key, value in calibration_factors[variable_name]['num'].items()) - denominator = prod(value for key, value in calibration_factors[variable_name]['den'].items()) - # Apply calibration to each variable - new_data_table[variable_name] = new_data_table[variable_name].mul((numerator / denominator)) - - # Add renaming entry - variable_rename_dict[variable_name] = f"{variable_name}_correct" # Rename the columns in the new data table new_data_table.rename(columns=variable_rename_dict, inplace=True) return new_data_table +def record_data_lineage(path_to_output_file, metadata): + + path_to_output_dir, output_file = os.path.split(path_to_output_file) + + path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) + # Ensure the file exists + if not os.path.exists(path_to_metadata_file): + with open(path_to_metadata_file, 'w') as f: + json.dump({}, f) # Initialize empty JSON + + # Read the existing JSON + with open(path_to_metadata_file, 'r') as metadata_file: + try: + json_dict = json.load(metadata_file) + except json.JSONDecodeError: + json_dict = {} # Start fresh if file is invalid + + # Compute relative output file path and update the JSON object + relpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') + json_dict[relpath_to_output_file] = metadata + + # Write updated JSON back to the file + with open(path_to_metadata_file, 'w') as metadata_file: + json.dump(json_dict, metadata_file, indent=4) + + + print(f"Metadata for calibrated data saved to {path_to_metadata_file}") + + return 0 + if __name__ == '__main__': # Set up argument parsing @@ -130,8 +158,8 @@ if __name__ == '__main__': path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv']) - path_tail, path_head = os.path.split(path_to_calibrated_file) - path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) + #path_tail, path_head = os.path.split(path_to_calibrated_file) + #path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) print('Path to output file :', path_to_calibrated_file) import dima.utils.g5505_utils as utils @@ -139,29 +167,11 @@ if __name__ == '__main__': calibrated_table = apply_calibration_factors(data_table, calibration_factors) metadata['processing_date'] = utils.created_at() calibrated_table.to_csv(path_to_calibrated_file, index=False) + + status = record_data_lineage(path_to_calibrated_file, metadata) - # Ensure the file exists - if not os.path.exists(path_to_metadata_file): - with open(path_to_metadata_file, 'w') as f: - json.dump({}, f) # Initialize empty JSON - - # Read the existing JSON - with open(path_to_metadata_file, 'r') as metadata_file: - try: - json_dict = json.load(metadata_file) - except json.JSONDecodeError: - json_dict = {} # Start fresh if file is invalid - - # Update the JSON object - outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') - json_dict[outputfileRelPath] = metadata - - # Write updated JSON back to the file - with open(path_to_metadata_file, 'w') as metadata_file: - json.dump(json_dict, metadata_file, indent=4) print(f"Calibrated data saved to {path_to_calibrated_file}") - print(f"Metadata for calibrated data saved to {path_to_metadata_file}") except Exception as e: print(f"Error during calibration: {e}") exit(1) \ No newline at end of file